#  File src/library/parallel/R/unix/mcparallel.R
#  Part of the R package, https://www.R-project.org
#
#  Copyright (C) 1995-2019 The R Core Team
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  A copy of the GNU General Public License is available at
#  https://www.R-project.org/Licenses/

### Derived from multicore version 0.1-6 by Simon Urbanek

mcparallel <- function(expr, name, mc.set.seed = TRUE, silent = FALSE, mc.affinity = NULL, mc.interactive = FALSE, detached = FALSE)
{
    f <- mcfork(detached)
    env <- parent.frame()
    if (isTRUE(mc.set.seed)) mc.advance.stream()
    if (inherits(f, "masterProcess")) {
        on.exit(mcexit(1L, structure("fatal error in wrapper code",
                                  class = "try-error")))
        if (isTRUE(mc.set.seed)) mc.set.stream()
        mc.interactive <- as.logical(mc.interactive)
        if (isTRUE(mc.interactive)) mcinteractive(TRUE)
        if (isTRUE(!mc.interactive)) mcinteractive(FALSE)
        if (!is.null(mc.affinity)) mcaffinity(mc.affinity)
        if (isTRUE(silent)) closeStdout(TRUE)
	if (detached) {
	    on.exit(mcexit(1L))
	    eval(expr, env)
	    mcexit(0L)
	}
	sendMaster(try(eval(expr, env), silent = TRUE), FALSE)
        mcexit(0L)
    }
    if (!missing(name) && !is.null(name)) f$name <- as.character(name)[1L]
    class(f) <- c("parallelJob", class(f))
    f
}

mccollect <- function(jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
{
    if (missing(jobs)) jobs <- children()
    if (!length(jobs)) return (NULL)
    if (isTRUE(intermediate)) intermediate <- utils::str
    pids <- if (inherits(jobs, "process") || is.list(jobs))
        processID(jobs) else jobs
    if (!length(pids)) return(NULL)
    if (!is.numeric(pids)) stop("invalid 'jobs' argument")
    pids <- as.integer(pids)
    pnames <- as.character(pids)
    if (!inherits(jobs, "process") && is.list(jobs))
        for(i in seq(jobs))
            if (!is.null(jobs[[i]]$name))
                pnames[i] <- as.character(jobs[[i]]$name)

    if (!wait) {
        s <- selectChildren(jobs, timeout)
        if (is.logical(s) || !length(s)) return(NULL) ## select error
        res <- lapply(s, function(x) NULL)
        delivered.result <- 0
        for (i in seq_along(s)) {
            x <- s[i]
            r <- readChild(x)
            if (is.raw(r)) {
                rmChild(x) ## avoid zombie process without waiting
                ## unserialize(r) might be null
                res[i] <- list(unserialize(r))
                delivered.result <- delivered.result + 1L
            }
        }
        names(res) <- pnames[match(s, pids)]
        expected.result <- length(s)
    } else {
        res <- lapply(pids, function(x) NULL)
        names(res) <- pnames
        fin <- rep(FALSE, length(pids))
        delivered.result <- 0
        while (!all(fin)) {
            s <- selectChildren(pids[!fin], -1)
            if (is.integer(s)) {
                for (pid in s) {
                    r <- readChild(pid)
                    if (is.raw(r)) {
                        ## unserialize(r) might be null
                        res[which(pid == pids)] <- list(unserialize(r))
                        delivered.result <- delivered.result + 1L
                    } else
                        ## child exiting or error
                        fin[pid == pids] <- TRUE 
                }
                if (is.function(intermediate)) intermediate(res)
            } else
                ## should not happen (select error)
                if (all(is.na(match(pids, processID(children()))))) break
        }
	expected.result <- length(pids)
        
    }
    nores <- expected.result - delivered.result
    if (nores > 0)
        warning(sprintf(ngettext(nores,
                                 "%d parallel job did not deliver a result",
                                 "%d parallel jobs did not deliver results"),
                        nores),
                domain = NA)
    cleanup(kill = FALSE, detach = FALSE) # compact children
    res
}
